
use once_cell::sync::OnceCell;
use std::sync::{RwLock, RwLockWriteGuard, RwLockReadGuard, Arc};
use std::collections::HashMap;
use crate::source::source_add_next;
use tokio::runtime::Runtime;
use futures::{future, Future, Stream};
use crate::util::{AsyncSend, async_channel_new, AsyncRecv};
use async_channel::Receiver;
use crate::logical_plan::work_node::util::ErrResult;
use crate::conf_init::{EventFlowNodeConfInstance, MapResult, ResultErr};

#[derive(Deserialize,Clone)]
pub struct WorkSpaceDebug{
    /*
    事件在work_node的流转路径
    */
    pub event_flow_path: Option<bool>,
    /*
    事件被work_node处理或者判断时的运行时错误
    */
    pub run_err: Option<bool>,
    /*
    可观测性指标：
        三个有关事件处理相关的时间参数：
            1）事件进入work_node入口队列的时间
            2）事件开始被处理的时间
            3）事件在该节点处理完，准备发往下一个节点的时间
        work_node节点入事件统计；work_node节点出事件统计；
        work_node节点cpu时钟周期统计
    */
    pub observability_indicator: Option<bool>,

}

#[derive(Deserialize)]
pub struct WorkSpaceConf{
    name: String,
    input: Vec<String>,
    threads: usize,
    debug: WorkSpaceDebug,
}

pub struct WorkSpace{
    name: String,
    ctl_recv: AsyncRecv,
    input: HashMap<String, AsyncRecv>,
    next: HashMap<String, AsyncSend>,
}
pub struct WorkSpaceCtl{
    name: String,
    runtime: Arc<Runtime>,
    ctl_send: AsyncSend,
    work_space: Option<WorkSpace>,
    debug: Arc<WorkSpaceDebug>,
}


lazy_static!{
    static ref WORK_SPACE_CTL_LIST: RwLock<HashMap<String, WorkSpaceCtl>> = {
        match work_space_init_from_conf(&EventFlowNodeConfInstance.work_space){
            MapResult::Ok(t) => {t},
            MapResult::Err(e) => {
                error!("work_space_init_from_conf failed, [{:?}]", e);
                std::process::exit(-1);
            },
        }
    };
}

fn work_space_runtime_build(work_space_name: &str, threads: usize)->MapResult<Runtime>{

    match tokio::runtime::Builder::new_multi_thread()
        .thread_name(format!("work_space_{}", work_space_name))
        .worker_threads(threads)
        .build(){
        Result::Ok(t) => {
            return MapResult::Ok(t);
        },
        Result::Err(e) => {
            let err_desc = format!("work_space [{}] runtime build failed [{:?}]",
                                   work_space_name, e);
            return MapResult::Err(ResultErr::new(err_desc));
        },
    }
}

pub fn work_space_init_from_conf(conf: &Vec<WorkSpaceConf>)->MapResult<RwLock<HashMap<String, WorkSpaceCtl>>>{

    let mut work_space_ctl_list = HashMap::new();

    for work_space_conf in conf.iter(){
        let mut work_space_input = HashMap::new();
        for input_name in work_space_conf.input.iter(){
            let input = source_add_next(input_name, &work_space_conf.name)?;
            work_space_input.insert(input_name.to_string(), input);
        }

        let runtime = work_space_runtime_build(&work_space_conf.name, work_space_conf.threads)?;

        let (ctl_send, ctl_recv) = async_channel_new();

        let work_space = WorkSpace{
            name: work_space_conf.name.to_string(),
            ctl_recv: ctl_recv,
            input: work_space_input,
            next: HashMap::new(),
        };
        let work_space_ctl = WorkSpaceCtl{
            name: work_space_conf.name.to_string(),
            runtime: Arc::new(runtime),
            ctl_send: ctl_send,
            work_space: Some(work_space),
            debug: Arc::new(work_space_conf.debug.clone()),
        };

        work_space_ctl_list.insert(work_space_conf.name.to_string(), work_space_ctl);
    }

    return MapResult::Ok(RwLock::new(work_space_ctl_list));
}

pub fn work_space_add_next(work_space_name: &str, next_name: &str)->MapResult<AsyncRecv>{
    let mut work_space_ctl_list = match WORK_SPACE_CTL_LIST.try_write(){
        Ok(t) => {t},
        Err(e) => {
            let err_desc = format!("WORK_SPACE_CTL_LIST try_write failed {:?}", e);
            return MapResult::Err(ResultErr::new(err_desc));
        },
    };

    if let Some(work_space_ctl) = work_space_ctl_list.get_mut(work_space_name){
        if let Some(ref mut work_space) = work_space_ctl.work_space{
            let (s,r) = async_channel_new();
            work_space.next.insert(next_name.to_string(), s);
            return MapResult::Ok(r);
        }else{
            let err_desc = format!("work_space [{}] add next, but work_space_ctl work_space is None",
                                   work_space_name);
            return MapResult::Err(ResultErr::new(err_desc));
        }
    }else{
        let err_desc = format!("work_space [{}] add next, but not exist", work_space_name);
        return MapResult::Err(ResultErr::new(err_desc));
    }
}

pub fn work_space_ctl_get(work_space_name: &str)->MapResult<(Arc<Runtime>,Arc<WorkSpaceDebug>)>{
    let mut work_space_ctl_list = match WORK_SPACE_CTL_LIST.try_write(){
        Ok(t) => {t},
        Err(e) => {
            let err_desc = format!("WORK_SPACE_CTL_LIST try_write failed {:?}", e);
            return MapResult::Err(ResultErr::new(err_desc));
        },
    };

    if let Some(work_space_ctl) = work_space_ctl_list.get(work_space_name){
        return MapResult::Ok((Arc::clone(&work_space_ctl.runtime), Arc::clone(&work_space_ctl.debug)));
    }else{
        let err_desc = format!("work_space [{}] runtime, but not exist", work_space_name);
        return MapResult::Err(ResultErr::new(err_desc));
    }
}

pub fn work_space_run()->MapResult<()>{
    /*
        每一个wokr_space的每一个input，都启动一个feature，负责从对应input接收数据并发给所有next
    */
    let mut work_space_ctl_list = match WORK_SPACE_CTL_LIST.try_write(){
        Ok(t) => {t},
        Err(e) => {
            let err_desc = format!("WORK_SPACE_CTL_LIST try_write failed {:?}", e);
            return MapResult::Err(ResultErr::new(err_desc));
        },
    };

    for (work_space_name, work_space_ctl) in work_space_ctl_list.iter_mut(){

        let runtime = Arc::clone(&work_space_ctl.runtime);
        if let Some(work_space) = work_space_ctl.work_space.take(){
            work_space_each(runtime, work_space);
        }else{
            let err_desc = format!("work_space [{}] run, but is None", work_space_name);
            return MapResult::Err(ResultErr::new(err_desc));
        }
    }

    return MapResult::Ok(());
}
async fn work_space_input_each(input: AsyncRecv, next: HashMap<String, AsyncSend>){
    loop{
        if let Some(data) = input.recv().await{
            for (_,n) in next.iter(){
                n.send(data.clone()).await
            }
        }
    }
}
fn work_space_each(runtime: Arc<Runtime>, work_space: WorkSpace){

    for (input_name,input) in work_space.input.iter(){
        runtime.spawn(
            work_space_input_each(input.clone(), work_space.next.clone())
        );
    }
}
